import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

# Use LakeSoul CDC Table Format

<!--
SPDX-FileCopyrightText: 2023 LakeSoul Contributors

SPDX-License-Identifier: Apache-2.0
-->

CDC (Change Data Capture) is an important data source for Lakehouse. The goal of LakeSoul CDC table format is to sync the change of online OLTP database into LakeSoul in a very low latency, usually several minutes, manner so that the downstream analytics could get the newest results as soon as possible without the need of tranditional T+1 database dump. Compared to normal table, CDC table format supports delete row capability.

LakeSoul uses an extra change operation column (column name is configurable) to model the CDC data and can consume the CDC sources including [Debezium](https://debezium.io/), [canal](https://github.com/alibaba/canal) as well as [Flink CDC](https://github.com/ververica/flink-cdc-connectors). By default LakeSoul would not enable this format, that means normal table would only support upsert operations, but not delete. To enable CDC format, you need to add extra property to enable it when creating table.

To create a LakeSoul CDC table, add a table property `lakesoul_cdc_change_column` with the column name that records the change type. This column should be of `string` type and contains one of the three values: `update`, `insert`, `delete`.

During the automatic merging phase of LakeSoul batch read data (including using Spark/Flink batch read), the latest `insert`, `update` data will be kept, and `delete` rows will be automatically filtered out. When using Spark/Flink streaming incremental reading, the value of the CDC change column (including `insert`, `update`, `delete`) will be retained. In Flink, this column will be automatically converted to corresponding value of the RowKind field of the RowData class object, so that LakeSoul fully supports the Flink Changelog Stream semantics during Flink stream reading, and can perform incremental calculations.


## Create LakeSoul Table with CDC Format

### In Spark
Use Scala API or SQL, assuming change operation column name is `change_type`:

<Tabs
    defaultValue="Scala"
    values={[
        {label: 'Scala', value: 'Scala'},
        {label: 'SQL', value: 'SQL'},
    ]}>
<TabItem value="Scala">

  ```scala
  import com.dmetasoul.lakesoul.tables.LakeSoulTable
  LakeSoulTable.createTable(data, path).shortTableName("cdc_ingestion").hashPartitions("id").hashBucketNum(2).rangePartitions("rangeid").tableProperty("lakesoul_cdc_change_column" -> "change_type").create()
  ```

</TabItem>
<TabItem value="SQL">

  ```sql
  CREATE TABLE table_name (id string, date string, change_type string) USING lakesoul
    PARTITIONED BY (date)
    LOCATION 's3://lakesoul-bucket/table_path'
    TBLPROPERTIES('lakesoul_cdc_change_column'='change_type',
      'hashPartitions'='id',
      'hashBucketNum'='2');
  ```

</TabItem>
</Tabs>

Note that LakeSoul CDC ingestion table must have primary key(s) and the primary keys(s) should be the same with the online OLTP table.

### In Flink
Please refer to [Flink Connector](../03-Usage%20Docs/06-flink-lakesoul-connector.md)

## Incremental Read for LakeSoul CDC Table

The LakeSoul adopts the primary key sharding mode for incremental upsert, so the incremental data does not need to be merged with the stock data when writing. For CDC tables, the delta data is the content of the original CDC stream. The CDC incremental read of the LakeSoul table can fully retain the CDC operation flags, namely insert, update, and delete. 

Version 2.2.0 supports incremental streaming reading in Spark.

From version 2.3.0, Flink Table Source is supported, and streaming incremental reading/writing is supported for Flink ChangeLog Stream semantics, please refer to [Flink Connector](../03-Usage%20Docs/06-flink-lakesoul-connector.md) .